Kotlin Flow 背压和线程切换竟然如此相似 | 您所在的位置:网站首页 › Golang 协程切换的时机 › Kotlin Flow 背压和线程切换竟然如此相似 |
前言
协程系列文章: 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇) Kotlin 协程调度切换线程是时候解开真相了 Kotlin 协程之线程池探索之旅(与Java线程池PK) Kotlin 协程之取消与异常处理探索之旅(上) Kotlin 协程之取消与异常处理探索之旅(下) 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用 继续来,同我一起撸Kotlin Channel 深水区 Kotlin 协程 Select:看我如何多路复用 Kotlin Sequence 是时候派上用场了 Kotlin Flow啊,你将流向何方? Kotlin Flow 背压和线程切换竟然如此相似 Kotlin SharedFlow&StateFlow 热流到底有多热? 狂飙吧,Lifecycle与协程、Flow的化学反应 来吧!接受Kotlin 协程--线程池的7个灵魂拷问 当,Kotlin Flow与Channel相逢 这一次,让Kotlin Flow 操作符真正好用起来上篇分析了Kotlin Flow原理,大部分操作符实现比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇重点分析两者的原理及使用。 通过本篇文章,你将了解到: 什么是背压? 如何处理背压? Flow buffer的原理 Flow 线程切换的使用 Flow 线程切换的原理 1. 什么是背压?先看自然界的水流: 当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现 而对于Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,如: suspend fun testBuffer1() { var flow = flow { //生产者 (1..3).forEach { println("emit $it") emit(it) } } flow.collect { //消费者 println("collect:$it") } } 复制代码通过collect操作符触发了流,从生产者生产数据(flow闭包),到消费者接收并处理数据(collect闭包),这就完成了流从上游到下游的一次流动过程。 2. 如何处理背压?模拟一个生产者消费者速度不一致的场景: suspend fun testBuffer3() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码计算流从生产到消费的整个时间: 显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢? 最简单的解决方案: 生产者和消费者分别在不同的线程执行 如: suspend fun testBuffer4() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.flowOn(Dispatchers.IO) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里执行,如flow闭包了的代码都在IO线程执行,也就是生产者在IO线程执行。 而消费者在当前线程执行,因此两者无需相互等待,节省了总时间: 确实是减少了时间,提升了效率。但我们知道开启线程代价还是挺大的,既然都在协程里运行了,能否借助协程的特性:协程挂起不阻塞线程 来完成此事呢? 此时,Buffer出场了,先看看它是如何表演的: suspend fun testBuffer5() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.buffer(5) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码这次没有使用flowOn,取而代之的是buffer。 运行结果如下: 先看看没有buffer时的耗时: suspend fun testBuffer3() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码从collect开始,依次执行flow闭包,通过emit调用到collect闭包,因为flow闭包里包含了几次emit,因此整个流程会有几次发射。 如上图,从步骤1到步骤8,因为是在同一个线程里,因此是串行执行的,整个流的耗时即为生产者到消费者(步骤1~步骤8)的耗时。 有buffer在没看源码之前,我们先猜测一下它的流程: 每次emit都发送到buffer里,然后立刻回来继续发送,如此一来生产者没有被消费者的速度拖累。 而消费者会检测Buffer里是否有数据,有则取出来。 根据之前的经验我们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了: 是谁触发了collect闭包的调用呢? 接下来深入源码,探究答案。 buffer源码流程分析创建Flow public fun Flow.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow { var capacity = capacity//buffer容量 var onBufferOverflow = onBufferOverflow//buffer满之后的处理策略 if (capacity == Channel.CONFLATED) { capacity = 0 onBufferOverflow = BufferOverflow.DROP_OLDEST } // create a flow return when (this) { is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow) //走else 分支,构造ChannelFlowOperatorImpl else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow) } } 复制代码buffer 返回Flow实例,其间涉及几个重要的类和函数: 调用collect 当调用Flow.collect时: public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) }) 复制代码构造了匿名内部类FlowCollector,并实现了emit方法,它的实现为collect的闭包。 调用ChannelFlowOperatorImpl.collect最终会调用ChannelFlow.collect: override suspend fun collect(collector: FlowCollector): Unit = coroutineScope { collector.emitAll(produceImpl(this)) } public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) 复制代码produceImpl 创建了Channel,内部开启了协程,返回ReceiveChannel。 再来看emitAll函数: private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, consume: Boolean) { ensureActive() var cause: Throwable? = null try { while (true) { //挂起等待Channel数据 val result = run { channel.receiveCatching() } if (result.isClosed) { //Channel关闭后才会退出循环 result.exceptionOrNull()?.let { throw it } break // returns normally when result.closeCause == null } //发送数据 emit(result.getOrThrow()) } } catch (e: Throwable) { cause = e throw e } finally { if (consume) channel.cancelConsumed(cause) } } 复制代码Channel此时并没有数据,因此协程会挂起等待。 Channel发送 Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。 前面提到过collect之后开启了协程: public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) internal val collectToFun: suspend (ProducerScope) -> Unit get() = { collectTo(it) } protected override suspend fun collectTo(scope: ProducerScope) = flowCollect(SendingCollector(scope)) 复制代码此时传入的参数为:collectToFun,最后构造了: public class SendingCollector( private val channel: SendChannel ) : FlowCollector { override suspend fun emit(value: T): Unit = channel.send(value) } 复制代码当协程得到执行时,会调用collectToFun-->collectTo(it)-->flowCollect(SendingCollector(scope)),最终调用到: #ChannelFlowOperatorImpl override suspend fun flowCollect(collector: FlowCollector) = flow.collect(collector) 复制代码而该flow为最开始的flow,collector为SendingCollector。 flow.collect后会调用到flow的闭包,进而调用到emit函数: private fun emit(uCont: Continuation, value: T): Any? { val currentContext = uCont.context currentContext.ensureActive() //... completion = uCont return emitFun(collector as FlowCollector, value, this as Continuation) } 复制代码emitFun本质上会调用collector里的emit函数,而此时的collector即为SendingCollector,最后调用channel.send(value) 如此一来,Channel就将数据发送出去了,此时channel.receiveCatching()被唤醒,接下来执行emit(result.getOrThrow()),这函数最后会流转到最初始的collect的闭包里。 上面的分析即为生产者到消费者的流转过程,单看源码可能比较乱,看图解惑: Flow buffer的本质上是利用了Channel进行数据的发送和接收 buffer为啥能提升效率前面分析过无buffer时生产者消费者的流程图,作为对比,我们也将加入buffer后生产者消费者的流程图。 由此可见,总共花费了7s。 至此,我们找到了buffer能够提高效率的原因: 生产者、消费者运行在不同的协程,挂起操作不阻塞对方 抛出一个比较有意思的问题:以下代码加buffer之后效率会有提升吗? suspend fun testBuffer6() { var flow = flow { (1..3).forEach { println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码在未实验之前,如果你已经有答案,恭喜你已经弄懂了buffer的本质。 4. Flow 线程切换的使用 suspend fun testBuffer4() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.flowOn(Dispatchers.IO) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码flowOn(Dispatchers.IO)表示其之前的操作符(函数)都在IO线程执行,如这里的意思是flow闭包里的代码在IO线程执行。 而其之后的操作符(函数)在当前的线程执行。 通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。 5. Flow 线程切换的原理 public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } 复制代码看到这你可能已经有答案了:这不就和buffer一样的方式吗? 但仔细看,此处多了个上下文:CoroutineContext。 CoroutineContext的作用就是用来决定协程运行在哪个线程。 前面分析的buffer时,我们的协程的作用域是runBlocking,即使生产者、消费者在不同的协程,但是它们始终在同一个线程里执行。 而使用了flowOn指定线程,此时生产者、消费者在不同的线程运行协程。 因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。 以上为Flow背压和线程切换的全部内容,下篇将分析Flow的热流。 本文基于Kotlin 1.5.3,文中完整Demo请点击 您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力 持续更新中,和我一起步步为营系统、深入学习Android/Kotlin1、Android各种Context的前世今生 2、Android DecorView 必知必会 3、Window/WindowManager 不可不知之事 4、View Measure/Layout/Draw 真明白了 5、Android事件分发全套服务 6、Android invalidate/postInvalidate/requestLayout 彻底厘清 7、Android Window 如何确定大小/onMeasure()多次执行原因 8、Android事件驱动Handler-Message-Looper解析 9、Android 键盘一招搞定 10、Android 各种坐标彻底明了 11、Android Activity/Window/View 的background 12、Android Activity创建到View的显示过 13、Android IPC 系列 14、Android 存储系列 15、Java 并发系列不再疑惑 16、Java 线程池系列 17、Android Jetpack 前置基础系列 18、Android Jetpack 易学易懂系列 19、Kotlin 轻松入门系列 20、Kotlin 协程系列全面解读 |
CopyRight 2018-2019 实验室设备网 版权所有 |